package com.atguigu.gmall.realtime.app.dwd;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.gmall.realtime.utils.MyKafka;
import com.atguigu.gmall.realtime.utils.MyKafkaPro;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

import java.text.SimpleDateFormat;

//消费从从kafka的ods_base_log话题中的日志，并且分成3个流，写入到kafka的各个topic。
public class BaseLogApp {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);
        //设置checkpoint的的配置
        //设置没5s保存一次checkpoint,精准一次消费
        env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
        //checkpoint必须在一分钟内完成，否则舍弃,检查点超时时间
        env.getCheckpointConfig().setCheckpointTimeout(6000);
        //checkpoint保存的位置，及其操作用户
        env.setStateBackend(new FsStateBackend("hdfs://hadoop104:8020/gmall/flink/checkpoint1"));
        System.setProperty("HADOOP_USER_NAME","atguigu");
        //重启策略
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,3000L));


        //消费者主题
        String topic="ods_base_log";
        //消费者组
        String groupId="BaseLogApp";




        //TODO 1 ods_base_log作为source输入源
        //TODO 1.1 连接kafka,封装一个kafka的工具类,获取连接.
        FlinkKafkaConsumer<String> kafkaSource = MyKafka.getFlinkKafkaConsumer(topic,groupId);
        //通过kafkaSource读取数据，封装成一个流
        DataStreamSource<String> kafkaDStream = env.addSource(kafkaSource);
        //流测试，查看是否话题能被消费.
//        kafkaDStream.print();

        //将String类型的数据转换为JSON对象.
     SingleOutputStreamOperator<JSONObject> jsonObjectDs = kafkaDStream.map(new MapFunction<String, JSONObject>() {
            @Override
            public JSONObject map(String value) throws Exception {
                return JSON.parseObject(value);
            }
        });

        //TODO 2 将数据进行修复，是否是新用户
        //TODO 2.1先对数据进行分组
        KeyedStream<JSONObject, String> kedByDS = jsonObjectDs.keyBy(
                r -> r.getJSONObject("common").getString("mid")
        );
        //TODO 2.2对分组后的数据进行处理,只是对原来的数据，进行修改错误的，输入什么，返回什么，一样为JSONObject
        SingleOutputStreamOperator<JSONObject> jsonObjectFlagDS = kedByDS.map(
                new RichMapFunction<JSONObject, JSONObject>() {
                    //定义状态
                    ValueState<String> firstVisitDataState;
                    //时间格式
                    SimpleDateFormat dfs;

                    @Override
                    public void open(Configuration parameters) throws Exception {
                        //初始化一次时间和时间格式
                        firstVisitDataState = getRuntimeContext().getState(new ValueStateDescriptor<String>("firstVisitData", String.class));
                        dfs = new SimpleDateFormat("yyyyMMdd");
                    }

                    //如果同一个id多个时间戳，那么应该就不是新用户
                    //时间戳，就flink的状态保存起来
                    @Override
                    public JSONObject map(JSONObject value) throws Exception {
                        //获取is_new=1的新用户，只处理为1的新用户。
                        String is_new = value.getJSONObject("common").getString("is_new");
                        //是1才来进行修复
                        if (is_new.equals("1")) {
                            //获取前设备的时间状态,如果有就是登录过，没有null,就是新的updata给状态里.
                            String firstVisitData = firstVisitDataState.value();
                            //获取他的访问时间
                            Long ts = value.getLong("ts");
                            String tsFormat = dfs.format(ts);
                            //如果有状态，就证明这个数据又来了一次，第二次过来.就是老用户，就是从头到尾，状态只有一个的就是新，再次来里面有状态就是老.
                            if (firstVisitData != null && firstVisitData.length() > 0) {
                                //0就是老户。
                                value.getJSONObject("common").put("is_new", "0");
                            } else {
                                //就是新用户，将ts更新进去,证明现在是第一次来.
                                firstVisitDataState.update(tsFormat);
                            }

                        }

                        return value;
                    }
                }
        );
//        jsonObjectFlagDS.print(">>>");

        //测试是否有修正的数据.
        //jsonObjectFlagDS.print("修正============================>");
        //TODO 3 将数据进行分流,启动日志->测输出流, 曝光数据->测输出流，页面日志->主流
        //需要进行测输出流，所以需要用到上下问，需要process,需要向kafka写数据所以需要转换为字符串.
        //启动日志测输出流
        OutputTag<String> outputStart=new OutputTag<String>("outputStart"){};
        OutputTag<String> outputDisPlay=new OutputTag<String>("outputDisPlay"){};
        SingleOutputStreamOperator<String> splitDStream = jsonObjectFlagDS.process(
                new ProcessFunction<JSONObject, String>() {
                    @Override
                    public void processElement(JSONObject jsonObject, Context context, Collector<String> collector) throws Exception {
                        //除了启动日志外，其他全部为页面数据.
                        JSONObject startJsonObject = jsonObject.getJSONObject("start");
                        //转化为String
                        String jsonObjectString = jsonObject.toJSONString();

                        //都属于页面数据输入到主流
                        if (startJsonObject != null && startJsonObject.size() > 0) {
                            //输出为侧数据流.
                            context.output(outputStart, jsonObjectString);
                        } else {
                           // System.out.println("====>" + jsonObject.getJSONObject("common").getString("is_new"));
                            //都属于页面数据输入到主流
                            collector.collect(jsonObjectString);
                            //分为曝光数据和页面
                            //判断曝光页面
                            JSONArray dispalyJsonObjectArr = jsonObject.getJSONArray("displays");

                            if (dispalyJsonObjectArr != null && dispalyJsonObjectArr.size() > 0) {
                                String page_id = jsonObject.getJSONObject("page").getString("page_Id");
                                //曝光数据侧输出流
                                //抓取page_id，放入到每个display
                                for (int i = 0; i < dispalyJsonObjectArr.size(); i++) {
                                    JSONObject jsonObject1 = dispalyJsonObjectArr.getJSONObject(i);
                                    jsonObject1.put("page_id", page_id);
                                    //输出侧输出流
                                    context.output(outputDisPlay, jsonObject1.toString());
                                }


                            }
                        }

                    }
                }
        );
        //获取测输出流
        //splitDStream.getSideOutput(outputStart).print("启动日志侧输出流》》》》》》》》》》》》》");
        //splitDStream.getSideOutput(outputDisPlay).print("曝光日志侧输出流###############################");
        splitDStream.print("主流*********************************");

        //TODO 4将数据写入kafka,也是封装一个生产者工具类
        DataStream<String> startsideOutput = splitDStream.getSideOutput(outputStart);
        DataStream<String> distplaysideOutput = splitDStream.getSideOutput(outputDisPlay);

        String TOPIC_StART="dwd_topic_start";
        String TOPIC_DISPLAY="dwd_topic_dispaly";
        String TOPIC_PAGE="dwd_topic_page";

        FlinkKafkaProducer<String> topicStart = MyKafkaPro.getFlinkKafkaProducer(TOPIC_StART);
        FlinkKafkaProducer<String> topicDispaly = MyKafkaPro.getFlinkKafkaProducer(TOPIC_DISPLAY);
        FlinkKafkaProducer<String> topicPage = MyKafkaPro.getFlinkKafkaProducer(TOPIC_PAGE);

        //splitDStream.print("###");
        startsideOutput.addSink(topicStart);
        distplaysideOutput.addSink(topicDispaly);
        splitDStream.addSink(topicPage);

        env.execute();
    }
}
